-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
tokio
-style broadcast channels
#229
Conversation
tractor/_live_from_tokio.py
Outdated
# their latest available value. | ||
for sub_key, seq in self._subs.items(): | ||
|
||
if key == sub_key: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe do a ._subs
copy and pop instead ?
397b34b
to
59354b4
Compare
There. I think this is ready to start adding tests. I'll leave it for a couple hours to see if any of you lurkers complain 😂 |
8a58df3
to
898756a
Compare
tractor/_broadcast.py
Outdated
# ``AsyncResource`` api. | ||
await clone.aclose() | ||
|
||
# TODO: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last set of design questions i think?
tractor/_broadcast.py
Outdated
# don't decerement the sequence # for this task since we | ||
# already retreived the last value | ||
subs.pop(key) | ||
for sub_key, seq in subs.items(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there some fancier-faster way to do something like this?
tractor/_broadcast.py
Outdated
else: | ||
await self._value_received.wait() | ||
|
||
seq = self._subs[key] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we drop all of these sequence # checks?
898756a
to
2bfea0b
Compare
lol, yeah so since moving to the "clones" version the actual N copies consistency was broken obviously because the Fixed this and now am adding a "broadcast state" type much like default mem chans. |
tokio
-style broadcast channel prototype
Yah so binance feeds proved that the assumption of slept consumers waking up can expect to read the We can maybe try this down the road if we invert the indexing? |
tractor/_streaming.py
Outdated
if self._broadcaster is None: | ||
self._broadcaster = broadcast_receiver( | ||
self, | ||
self._rx_chan._state.max_buffer_size, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This presumes a trio.MemoryReceiveChannel
which is not ideal, so what would be the best default here and how would we reflect it in typing?
tractor/_streaming.py
Outdated
# of the the ``BroadcastReceiver`` before calling | ||
# this method for the first time. | ||
|
||
# XXX: why does this work without a recursion issue?! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was broken actually, but is now fixed by adding support for passing into BroadcastReceiver
a recv_afunc
kwarg that is by default the underlying's .receive()
if not provided.
This makes it easy to wrap receive channel apis for use in task broadcasting without expecting any initial consumer-task of the underlying to also use a broadcast receiver instance when calling .receive()
, it however does mutate the machinery to put the broadcaster behind the scenes as it were.
Oh right, to use |
tokio
-style broadcast channel prototypetokio
-style broadcast channels
7a3a587
to
2eb7715
Compare
tests/test_local_task_broadcast.py
Outdated
retries = 3 | ||
size = 100 | ||
tx, rx = trio.open_memory_channel(size) | ||
brx = broadcast_receiver(rx, size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm almost tempted to enforce this as an async context manager, something I kinda wish open_memory_channel()
did as well.
tests/test_local_task_broadcast.py
Outdated
|
||
async with brx.subscribe() as lbrx: | ||
while True: | ||
# await tractor.breakpoint() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove.
For every set of broadcast receivers which pull from the same producer, we need a singleton state for all of, - subscriptions - the sender ready event - the queue Add a `BroadcastState` dataclass for this and pass it to all subscriptions. This makes the design much more like the built-in memory channels which do something very similar with `MemoryChannelState`. Use a `filter()` on the subs list in the sequence update step, plus some other commented approaches we can try for speed.
Add `ReceiveMsgStream.subscribe()` which allows allocating a broadcast receiver around the stream for use by multiple actor-local consumer tasks. Entering this context manager idempotently mutates the stream's receive machinery which for now can not be undone. Move `.clone()` to the receive stream type. Resolves #204
This allows for wrapping an existing stream by re-assigning its receive method to the allocated broadcaster's `.receive()` so as to avoid expecting any original consumer(s) of the stream to now know about the broadcaster; this instead mutates the stream to delegate to the new receive call behind the scenes any time `.subscribe()` is called. Add a `typing.Protocol` for so called "cloneable channels" until we decide/figure out a better keying system for each subscription and mask all undesired typing failures.
Get rid of all the (requirements for) clones of the underlying receivable. We can just use a uuid generated key for each instance (thinking now this can probably just be `id(self)`). I'm fully convinced now that channel cloning is only a source of confusion and anti-patterns when we already have nurseries to define resource lifetimes. There is no benefit in particular when you allocate subscriptions using a context manager (not sure why `trio.open_memory_channel()` doesn't enforce this). Further refinements: - add a `._closed` state that will error the receiver on reuse - drop module script section; it's been moved to a real test - call the "receiver" duck-type stub a new name
Add a couple more tests to check that a parent and sub-task stream can be lagged and recovered (depending on who's slower). Factor some of the test machinery into a new ctx mngr to make it all happen.
The `collections.deque` takes care of array length truncation of values for us implicitly but in the future we'll likely want this value exposed to alternate array implementations. This patch is to provide for that as well as make `mypy` happy since the `dequeu.maxlen` can also be `None`.
ca15098
to
1137a9e
Compare
Literally the most prototype thing everactually decently well tested now, but based on discussion in:rust
hasNothing more then a runnable module atm..
Some interesting notes:
deque.appendleft()
) where the oldest value is just discarded, for some examples:ringbuf
's handling of overflow is much the sameringbuf
implUpdate:
before i lose this thought...
If we wanted to also support slowest consumer style, I'm pretty sure there's an easy hack to just inherit and proxy through reads to
trio._channel.MemoryChannelState.data
(presuming thequeue
size tobroadcast_receiver()
and the underlying mem chan'sdeque
are the same size).The thinking is, there's a length check to determine if the sender should block in
MemorySendChannel.send_nowait()
:https://github.com/python-trio/trio/blob/master/trio/_channel.py#L147
If
.data
here proxied tolambda: max(self._subs.values())
and the order of these branches were reversed, it might just work no?Follow up:
id(self)
for.key
instead ofuuid4()
?._recv()
is cancelled before the first ask in receives a value and that ensure that no other event queued tasks miss that value and/or re-get their last pulled value